feat(ha): Control Plane HA#1
Conversation
…rkers - Add LeaderElector port interface (internal/core/ports/leader.go) - Implement PgLeaderElector using pg_try_advisory_lock with 5s heartbeat - Create LeaderGuard wrapper to ensure singleton workers run on exactly one node - Wrap 11 singleton workers: LB, AutoScaling, Cron, Container, Accounting, Lifecycle, ReplicaMonitor, ClusterReconciler, Healing, DatabaseFailover, Log - Add unit tests for leader election and guard behavior
- Extend TaskQueue port with DurableTaskQueue interface using Redis Streams with consumer groups for exactly-once delivery - Implement Redis Streams durable queue with EnsureGroup, Receive, Ack, Nack, ReclaimStale methods - Add ExecutionLedger port interface for idempotent job processing - Implement PgExecutionLedger using job_executions table with ON CONFLICT DO NOTHING - Integrate durable queue + ledger into ProvisionWorker, ClusterWorker, PipelineWorker with bounded concurrency - Add migration 100_create_job_executions - Add noop implementations for testing
…ry utilities Circuit Breaker: - Add half-open single-flight: only one probe request allowed at a time - Add OnStateChange callback (synchronous) for observability - Add SuccessRequired for multi-success half-open→closed transition - Add Name and State.String() methods - Backward compatible with existing NewCircuitBreaker(threshold, timeout) Bulkhead: - Add semaphore-based concurrency limiter with configurable wait timeout - Returns ErrBulkheadFull when limit reached and timeout expires Retry: - Add exponential backoff with full jitter - Configurable ShouldRetry predicate for selective retry - Context-aware cancellation
…ackends Add decorator wrappers implementing ports interfaces with resilience patterns: - ResilientCompute: CB (5 fails/30s) + Bulkhead (20 conc) + Timeouts - ResilientNetwork: CB (5 fails/30s) + Bulkhead (15 conc) + Timeout (30s) - ResilientStorage: CB (5 fails/30s) + Bulkhead (10 conc) + Timeouts - ResilientDNS: CB (5 fails/30s) + Timeout (10s) - no bulkhead needed - ResilientLB: CB (5 fails/30s) + Timeouts (30s normal, 2m deploy) Design: - Ping() bypasses bulkhead (cheap health check) but uses CB - Type() delegates directly (pure metadata) - Retry NOT applied at adapter level (dangerous for provisioning) - All wrappers have configurable options with sensible defaults - SuccessRequired: 2 for half-open→closed (extra safety) Add comprehensive tests for ResilientCompute (passthrough, circuit trip, bulkhead limits, timeout, unwrap, ping bypass).
- Wrap all backends with resilient decorators in main.go: NewResilientCompute, NewResilientStorage, NewResilientNetwork, NewResilientLB - Wrap DNS backend with resilient decorator in dependencies.go - Create PgLeaderElector and wire into ServiceConfig - Update ProvisionWorker, ClusterWorker, PipelineWorker to use: * DurableTaskQueue (Redis Streams with consumer groups) * ExecutionLedger for idempotent job processing * Bounded concurrency via semaphore (provision=20, cluster=10, pipeline=5) - Update workers to use Receive/Ack/Nack pattern for exactly-once delivery - Add role validation tests
HA Drills (ha_drills_test.go): 1. Circuit breaker trip and recovery (validates 3 state transitions) 2. Bulkhead saturation and graceful rejection 3. Resilient adapter end-to-end (CB + bulkhead + timeout compose) 4. Retry backoff and context cancellation 5. Half-open single-flight validation Release Gates (release_gates_test.go) - validate SLOs: 1. Fail-fast latency <1ms when circuit is open 2. Bulkhead isolation (saturated compute doesn't affect network) 3. Circuit recovery within resetTimeout window 4. Retry idempotency (exactly MaxAttempts executions) 5. Independent circuit breakers don't interfere Total: 13 new tests validating HA invariants.
|
Caution Review failedThe pull request is closed. ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (11)
📝 WalkthroughWalkthroughThe PR adds resilience primitives (circuit breaker, bulkhead, retry), durable task queues, an execution ledger, Postgres leader election, resilient backend wrappers (compute, network, storage, DNS, LB), and worker refactors to use durable messaging and leader-guarded singleton execution; wiring is applied to startup/service setup and tests. Changes
Sequence Diagram(s)sequenceDiagram
participant App as Application
participant Elector as LeaderElector
participant Guard as LeaderGuard
participant Worker as Worker (Runner)
participant Queue as DurableQueue
participant Ledger as ExecutionLedger
participant Backend as Backend (Compute/Net/Storage)
App->>Elector: RunAsLeader(ctx, key, runFn)
Elector->>Guard: grant leadership / notify
Guard->>Worker: Run(ctx)
Worker->>Queue: EnsureGroup(queue, group)
loop process messages
Worker->>Queue: Receive(queue, group, consumer)
Queue-->>Worker: DurableMessage{ID, Payload}
Worker->>Ledger: TryAcquire(jobKey, staleThreshold)
alt acquired
Worker->>Backend: execute job
alt success
Worker->>Ledger: MarkComplete(jobKey, result)
Worker->>Queue: Ack(queue, group, msgID)
else failure
Worker->>Ledger: MarkFailed(jobKey, reason)
Worker->>Queue: Nack(queue, group, msgID)
end
else already processed
Worker->>Queue: Ack(queue, group, msgID)
end
end
sequenceDiagram
participant Caller as Client
participant Wrapper as ResilientWrapper
participant Bulk as Bulkhead
participant CB as CircuitBreaker
participant Inner as InnerBackend
Caller->>Wrapper: Operation(ctx,args)
Wrapper->>Bulk: Acquire slot
alt slot granted
Wrapper->>CB: allowRequest()
alt allowed
Wrapper->>Wrapper: apply timeout
Wrapper->>Inner: invoke
Inner-->>Wrapper: result / error
alt success
Wrapper->>CB: recordSuccess()
Wrapper-->>Caller: return result
else error
Wrapper->>CB: recordFailure()
Wrapper-->>Caller: return error
end
else open
Wrapper-->>Caller: ErrCircuitOpen (fail-fast)
end
Wrapper->>Bulk: release slot
else no slot
Wrapper-->>Caller: ErrBulkheadFull
end
Estimated code review effort🎯 5 (Critical) | ⏱️ ~110 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 20
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/platform/circuit_breaker.go (1)
104-116:⚠️ Potential issue | 🟠 MajorA panic during request execution leaves
halfOpenInFlightset to true, wedging the circuit breaker.If
fn()panics afterallowRequest()setshalfOpenInFlight=true, neitherrecordFailure()norrecordSuccess()runs. The flag remains true permanently. All subsequent calls toExecute()will returnErrCircuitOpenbecauseallowRequest()rejects requests whenhalfOpenInFlightis true (lines 131–132, 140–141). The breaker is locked until manually reset.Suggested fix
func (cb *CircuitBreaker) Execute(fn func() error) error { if !cb.allowRequest() { return ErrCircuitOpen } + defer func() { + if r := recover(); r != nil { + cb.recordFailure() + panic(r) + } + }() + err := fn() if err != nil { cb.recordFailure() return err } cb.recordSuccess() return nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/platform/circuit_breaker.go` around lines 104 - 116, In Execute, a panic inside fn() can leave the halfOpenInFlight flag set and permanently open the circuit; wrap the fn() invocation with a defer-recover that on panic calls cb.recordFailure() (or otherwise clears/resets halfOpenInFlight via the same internal path used by recordFailure), then re-raises the panic so behavior is unchanged; keep the existing error and success flows (calls to recordFailure() and recordSuccess()) intact. Target symbols: Execute, allowRequest, recordFailure, recordSuccess, and the halfOpenInFlight flag.
🧹 Nitpick comments (5)
internal/workers/leader_guard_test.go (1)
130-143: Remove unusedinnervariable.The
innervariable is declared but never used sincecountingRunneris passed toNewLeaderGuardinstead. The comment on line 143 acknowledges this but the variable should simply be removed.🧹 Cleanup unused variable
} - inner := &mockRunner{} - // Override mockRunner to not block countingRunner := &countingMockRunner{} guard := NewLeaderGuard(elector, "test:worker", countingRunner, newTestLogger()) @@ ... @@ wg.Wait() - _ = inner // unused, countingRunner is used instead runs := countingRunner.runCalled.Load()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/workers/leader_guard_test.go` around lines 130 - 143, Remove the unused local variable `inner` of type `*mockRunner` from the test: it's declared but never referenced and `countingRunner` is passed into `NewLeaderGuard` instead; delete the `inner := &mockRunner{}` line and the trailing comment referencing it so the test only constructs `countingRunner`, calls `NewLeaderGuard(elector, "test:worker", countingRunner, newTestLogger())`, and runs `guard.Run(ctx, wg)` without an unused variable.internal/workers/leader_guard.go (1)
69-84: Consider adding backoff on leader election errors.If
RunAsLeaderreturns immediately with an error (e.g., database connectivity issues), the loop will retry immediately without any delay, potentially causing a hot loop that wastes CPU and floods logs.♻️ Suggested backoff on error
if err != nil { if ctx.Err() != nil { // Parent context cancelled — clean shutdown g.logger.Info("leader guard shutting down", "key", g.key) return } g.logger.Error("leader election error, will retry", "key", g.key, "error", err) + // Backoff before retrying to avoid hot loop on persistent errors + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + return + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/workers/leader_guard.go` around lines 69 - 84, The loop in the leader guard currently retries immediately when RunAsLeader returns an error, which can spin; modify the loop around the RunAsLeader error path (the block that checks err, ctx, g.logger, g.key) to apply a backoff before retrying: implement a context-aware exponential backoff with jitter (capped maximum) and use select on ctx.Done() vs time.After to abort early, logging the backoff duration via g.logger; ensure the backoff is only applied for transient errors (when ctx.Err() == nil) so clean shutdowns are unaffected.internal/workers/cluster_worker.go (1)
181-201: Consider logging or handling repo.Update errors.The
_ = w.repo.Update(ctx, cluster)calls discard errors. While this is a common pattern for "best effort" status updates, persistent update failures could leave clusters in stale states without visibility.📝 Proposed improvement for observability
- _ = w.repo.Update(ctx, cluster) + if err := w.repo.Update(ctx, cluster); err != nil { + w.logger.Warn("failed to update cluster status", "cluster_id", cluster.ID, "error", err) + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/workers/cluster_worker.go` around lines 181 - 201, The handleProvision function currently ignores errors from w.repo.Update (several occurrences) which can hide persistent DB failures; change each "_ = w.repo.Update(ctx, cluster)" to capture the returned error, log it with context (include "cluster_id" and the error) and decide per call whether to escalate (return the error) or treat as best-effort: for status transitions before calling provision you can log and continue, but for updates after a provisioning failure or after marking running/failed you should log the failure and return it to avoid leaving the cluster in a stale state; update references: ClusterWorker.handleProvision and w.repo.Update call sites inside that function to implement this logging/return behavior.internal/repositories/redis/durable_task_queue.go (1)
209-219: Usestrings.Containsinstead of custom substring search.The
containsSubstringfunction reimplementsstrings.Containsfrom the standard library. The custom implementation adds maintenance burden without providing benefits.♻️ Proposed simplification
+import "strings" + func containsBusyGroup(s string) bool { - return len(s) >= 9 && (s[:9] == "BUSYGROUP" || containsSubstring(s, "BUSYGROUP")) + return strings.Contains(s, "BUSYGROUP") } - -func containsSubstring(s, sub string) bool { - for i := 0; i+len(sub) <= len(s); i++ { - if s[i:i+len(sub)] == sub { - return true - } - } - return false -}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/repositories/redis/durable_task_queue.go` around lines 209 - 219, Replace the custom containsSubstring and manual prefix/substring checks with the standard library: import "strings", change containsBusyGroup(s string) to return strings.HasPrefix(s, "BUSYGROUP") || strings.Contains(s, "BUSYGROUP"), and delete the containsSubstring function; this removes the redundant len checks and uses the built-in, well-tested string utilities.internal/platform/resilient_storage.go (1)
151-156: RunPingthrough the bulkhead too.Health probes currently bypass the concurrency limiter, so a probe storm can still saturate the storage backend while normal calls are capped. Routing
PingthroughcallProtectedkeeps overload behavior consistent.♻️ Small consistency fix
func (r *ResilientStorage) Ping(ctx context.Context) error { - return r.cb.Execute(func() error { - ctx2, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - return r.inner.Ping(ctx2) - }) + return r.callProtected(ctx, 5*time.Second, func(ctx context.Context) error { + return r.inner.Ping(ctx) + }) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/platform/resilient_storage.go` around lines 151 - 156, The Ping method currently calls r.cb.Execute directly, bypassing the existing bulkhead-wrapper helper; change Ping to use the same callProtected helper as other operations so health probes go through the concurrency limiter. Specifically, in ResilientStorage.Ping replace the r.cb.Execute(...) block with a call to r.callProtected (or the existing callProtected method) passing ctx and a closure that creates the 5s timeout context and calls r.inner.Ping(ctx2), preserving the timeout and cancel semantics and returning the inner error so Ping is subject to the same bulkhead behavior as other methods.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@cmd/api/main_test.go`:
- Around line 200-233: The test TestRunApplicationDefaultsToAllRole should
assert the shutdown path: modify the fake
StartHTTPServer/ShutdownHTTPServer/NotifySignals setup used around
runApplication so StartHTTPServer blocks on a channel until a shutdown is
triggered, then verify ShutdownHTTPServer is invoked (e.g., set a flag or send
on a channel inside ShutdownHTTPServer). Keep the same pattern used in
TestRunApplicationApiRoleStartsAndShutsDown: have NotifySignals send SIGTERM
after a short sleep, StartHTTPServer wait for a "started" signal and then block
until a "stop" signal from ShutdownHTTPServer, and assert serverStarted and that
ShutdownHTTPServer ran to ensure the full lifecycle for runApplication is
exercised when ROLE is empty.
In `@internal/core/ports/task_queue.go`:
- Around line 50-54: The Nack contract on the TaskQueue interface currently
allows implementations to "explicitly re-queue" while the original delivery is
still pending, which can create duplicate live copies; change the Nack
documentation and signature contract on Nack(ctx context.Context, queueName,
groupName, messageID string) error to explicitly forbid re-queuing or creating a
new live message ID and restrict it to only relinquishing the current delivery
(e.g., by releasing visibility/pending entry); add a new explicit method (e.g.,
Requeue(ctx context.Context, queueName, groupName, messageID string) error) that
transfers ownership/creates a new delivery and document its semantics separately
so implementations stop forking messages in Nack and instead implement the new
Requeue API for immediate re-enqueue semantics, updating any implementing types
and callers to follow the new contract.
In `@internal/drills/ha_drills_test.go`:
- Around line 359-385: The test currently races on time.Sleep when trying to
ensure the probe goroutine is the half-open probe and when waiting for the
circuit to close; replace both sleeps with explicit synchronization: add a
probeStarted channel that the goroutine running cb.Execute signals (close or
send) immediately after it becomes the probe (before blocking on probeDone) and
have the main test wait for probeStarted before issuing the looped requests, and
instead of sleeping after closing probeDone wait for either the probe to finish
(probeDone completion) or poll/wait on cb.GetState() until it equals
platform.StateClosed (or use a done channel signaled by the probe goroutine
after it returns) before asserting the closed state so the test is deterministic
and not timing-dependent.
In `@internal/drills/release_gates_test.go`:
- Around line 69-88: The test uses time.Sleep to assume the two goroutines have
acquired compute slots before calling bhCompute.Execute, which is flaky; change
the synchronization so each worker signals after it successfully enters
bhCompute.Execute (e.g., use a startedCh or another WaitGroup) and only close
blockCh / perform the rejection check after receiving two signals that the
goroutines are inside the Execute callback; update the goroutines that call
bhCompute.Execute to send on startedCh (or Done() on the second WaitGroup)
immediately after entering the Execute callback and before blocking on blockCh
so the subsequent call to bhCompute.Execute reliably observes a saturated
bulkhead.
In `@internal/platform/bulkhead_test.go`:
- Around line 104-117: The test currently uses time.Sleep to wait for the
goroutine that calls bh.Execute to finish; instead make the goroutine signal
when Execute returns and have the test wait on that signal so the final assert
is deterministic. Concretely, in TestBulkheadAvailable change the anonymous
goroutine that calls bh.Execute(...) to close a new finished channel (or call
wg.Done on a sync.WaitGroup) after bh.Execute returns, then replace
time.Sleep(10 * time.Millisecond) with a receive on that finished channel (or
wg.Wait()) so the test only proceeds to the last assert after bh.Execute has
completed.
- Around line 63-64: Replace the non-fatal assertion with a fatal one: change
assert.ErrorIs(t, err, ErrBulkheadFull) to require.ErrorIs(t, err,
ErrBulkheadFull) for the bh.Execute(...) rejection checks so the test stops
immediately on failure; apply the same replacement for the other identical
assertion later in the file (the second bh.Execute check around the other
blocking assertion).
In `@internal/platform/bulkhead.go`:
- Around line 52-71: The acquire function can grab a semaphore slot even if ctx
is already canceled; update Bulkhead.acquire to first check for pre-canceled
contexts (e.g., if ctx.Err() != nil return ErrBulkheadFull) before creating the
timer or entering any select, and similarly perform the same early-cancel check
immediately before the timed select branch so Execute will never run fn() for an
already-canceled context.
In `@internal/platform/circuit_breaker_test.go`:
- Around line 110-128: The test currently uses time.Sleep after closing
probeDone which is flaky; replace the sleep by synchronizing the probe goroutine
completion (e.g., add a probeFinished channel or sync.WaitGroup and have the
goroutine that calls cb.Execute signal it when the probe returns) so that after
close(probeDone) the test waits for the probe goroutine to finish before
asserting cb.GetState() equals StateClosed; adjust references to
probeStarted/probeDone and the anonymous goroutine that calls cb.Execute to
signal probeFinished when the probe completes to make the assertion
deterministic.
In `@internal/platform/resilient_compute_test.go`:
- Around line 209-227: The test currently uses ready/time.Sleep which only
proves goroutines started, not that StartInstance actually acquired the bulkhead
slot; modify mockCompute.wait to signal a new in-flight invocation by sending on
a channel (e.g., add entered chan struct{} to mockCompute and have wait
non-blockingly send into entered when called), initialize mock.entered =
make(chan struct{}, 2) in the test, then replace the time.Sleep/ready
synchronization: wait to receive two signals from mock.entered to ensure the
first two goroutines have acquired the semaphore before releasing others or
calling Ping, and keep checking for ErrBulkheadFull from StartInstance as
before.
In `@internal/platform/resilient_compute.go`:
- Around line 140-147: GetInstanceLogs (and similarly GetInstanceStats)
currently returns an io.ReadCloser whose underlying context is cancelled when
callProtected's deferred cancel runs; fix by creating a wrapper
cancelOnCloseReadCloser type that embeds io.ReadCloser and holds the cancel
context (cancel context created by callProtected), return an instance of that
wrapper from inside callProtected so the wrapper's Close invokes the original
ReadCloser.Close and then calls cancel(), and ensure callProtected does not
cancel the context until the wrapper's Close is invoked; update
ResilientCompute.GetInstanceLogs and ResilientCompute.GetInstanceStats to use
cancelOnCloseReadCloser to defer cancellation until the stream is closed.
In `@internal/platform/retry.go`:
- Around line 81-95: The backoffDelay function implements equal jitter (flooring
at base/2) instead of the promised full jitter; change the jitter range so the
randomized delay is uniform in [0, calculated] rather than [base/2, calculated].
In function backoffDelay use a random value from 0 up to calculated (e.g.,
rand.Int63n(int64(calculated)+1) or equivalent) and return that as the jittered
duration, keeping the earlier exponential calculation and max clamping intact;
also update the test bounds in internal/platform/retry_test.go to expect values
in [0, calculated] (or adjust assertions accordingly).
In `@internal/repositories/postgres/execution_ledger.go`:
- Around line 80-90: The "failed" branch ignores the Exec result and always
returns true, causing a race where multiple workers think they reclaimed the
job; change the code in the "failed" case to capture the Exec result (e.g., tag,
err := l.db.Exec(ctx, ... , jobKey)), check tag.RowsAffected() and only return
true when RowsAffected() > 0, otherwise return false, nil; preserve the existing
error wrapping (fmt.Errorf("execution ledger retry %s: %w", jobKey, err)) for
Exec errors so behavior matches the "running" optimistic-locking path.
- Around line 45-48: Replace the direct comparison against pgx.ErrNoRows with
errors.Is to handle wrapped errors: in the execution ledger insert error
handling (the block that checks err and references pgx.ErrNoRows and jobKey),
change the conditional from "err != nil && err != pgx.ErrNoRows" to "err != nil
&& !errors.Is(err, pgx.ErrNoRows)" so wrapped pgx.ErrNoRows values are
recognized and the function returns the same fmt.Errorf(...) only for other
errors.
In `@internal/repositories/postgres/leader_elector.go`:
- Around line 40-45: The conversion from uint64 to int64 in keyToLockID triggers
gosec G115; change the mask to use a typed constant and explicit uint64-to-int64
conversion to make intent clear and avoid overflow warnings: replace the literal
mask with uint64(math.MaxInt64) and compute int64(h.Sum64() &
uint64(math.MaxInt64)), adding an import for math; this keeps the value
non-negative and makes the cast explicit for the linter in the keyToLockID
function.
In `@internal/repositories/redis/durable_task_queue.go`:
- Around line 86-91: The XDel error is currently ignored in the Dequeue path;
change the call in the Dequeue implementation (where q.client.XDel(ctx,
queueName, msg.ID) is invoked) to capture the returned error and handle it — at
minimum log a warning or error including queueName and msg.ID (and optionally
payload) using the queue's logger (e.g., q.logger or the existing logging
facility), so transient Redis failures are visible; consider retaining behavior
(still return payload) but ensure the deletion error is logged with context so
duplicates can be diagnosed.
In `@internal/workers/cluster_worker.go`:
- Around line 65-68: The reclaimLoop goroutine is not tracked by the worker's
lifecycle; update Run to increment the worker WaitGroup before starting
reclaimLoop (call wg.Add(1)), start reclaimLoop in a small wrapper that defers
wg.Done(), and ensure Run waits for wg.Wait() after ctx.Done() so in-flight
reclaim operations using sem (the channel made with clusterMaxWorkers) finish
before returning; alternatively add wg.Done() inside the top of reclaimLoop and
call wg.Wait() in Run after cancellation.
In `@internal/workers/pipeline_worker.go`:
- Around line 21-29: The ReclaimStale handling currently acks messages when
TryAcquire returns !acquired, which drops builds while the ledger lease is still
active (pipelineStaleThreshold). Change the logic in ReclaimStale/where
TryAcquire is checked so you only call Ack when the ledger entry truly indicates
completion; for the active-lease case (state == "running" and timestamp within
pipelineStaleThreshold) do not Ack—either Nack/leave pending or extend
TryAcquire to return explicit states so you can distinguish completed vs
running; use TryAcquire, the ledger entry timestamp, and pipelineStaleThreshold
to decide (only Ack when status==completed or timestamp older than
pipelineStaleThreshold).
- Around line 132-139: The current branch treats all nil returns from
loadBuildAndPipeline as "not_found" and acks the message; change
loadBuildAndPipeline to return an explicit error (e.g., (build, pipeline, err))
or otherwise expose why it returned nil, then update the handling so only true
not-found results call ledger.MarkComplete(..., "not_found") and Ack(...); for
transient DB/backend errors (err != nil and not a not-found sentinel) do not
mark complete or ack—let the message remain/retry or requeue—and log/propagate
the error appropriately; reference loadBuildAndPipeline, build, pipeline, err,
ledger.MarkComplete, taskQueue.Ack, pipelineQueueName, pipelineGroup, msg.ID,
workerCtx, and jobKey to locate the changed code.
In `@internal/workers/provision_worker_test.go`:
- Around line 166-172: The tests currently only assert the expected side was
touched (using tt.wantAcked/tt.wantNacked and fq.acked/fq.nacked) but don’t
assert the opposite side remained untouched; update the assertions so that when
tt.wantAcked is true you also assert fq.nacked is empty, when tt.wantNacked is
true you also assert fq.acked is empty, and when neither flag is set assert both
fq.acked and fq.nacked are empty; keep the existing buf.String() log assertion
and use the same fq.acked/fq.nacked symbols to locate the checks to modify.
In `@internal/workers/provision_worker.go`:
- Around line 19-28: The reclaim window (provisionReclaimMs) is shorter than the
idempotency ledger lease (provisionStaleThreshold), which lets peers reclaim
messages and ack them while ledger entries still show "running"; update the
handling so that when a consumer sees acquired==false it does NOT
unconditionally ack the message — instead query the idempotency ledger entry for
the job and only ack if the ledger shows the job is completed or the ledger
entry is stale; otherwise leave the message unacked for the original worker to
finish. Additionally, either increase provisionReclaimMs to be >=
provisionStaleThreshold (plus a small buffer) or document the dependant timing,
and apply the same change to the other ack path referenced in the file (the code
handling acquired checks and acking in the provision worker loop).
---
Outside diff comments:
In `@internal/platform/circuit_breaker.go`:
- Around line 104-116: In Execute, a panic inside fn() can leave the
halfOpenInFlight flag set and permanently open the circuit; wrap the fn()
invocation with a defer-recover that on panic calls cb.recordFailure() (or
otherwise clears/resets halfOpenInFlight via the same internal path used by
recordFailure), then re-raises the panic so behavior is unchanged; keep the
existing error and success flows (calls to recordFailure() and recordSuccess())
intact. Target symbols: Execute, allowRequest, recordFailure, recordSuccess, and
the halfOpenInFlight flag.
---
Nitpick comments:
In `@internal/platform/resilient_storage.go`:
- Around line 151-156: The Ping method currently calls r.cb.Execute directly,
bypassing the existing bulkhead-wrapper helper; change Ping to use the same
callProtected helper as other operations so health probes go through the
concurrency limiter. Specifically, in ResilientStorage.Ping replace the
r.cb.Execute(...) block with a call to r.callProtected (or the existing
callProtected method) passing ctx and a closure that creates the 5s timeout
context and calls r.inner.Ping(ctx2), preserving the timeout and cancel
semantics and returning the inner error so Ping is subject to the same bulkhead
behavior as other methods.
In `@internal/repositories/redis/durable_task_queue.go`:
- Around line 209-219: Replace the custom containsSubstring and manual
prefix/substring checks with the standard library: import "strings", change
containsBusyGroup(s string) to return strings.HasPrefix(s, "BUSYGROUP") ||
strings.Contains(s, "BUSYGROUP"), and delete the containsSubstring function;
this removes the redundant len checks and uses the built-in, well-tested string
utilities.
In `@internal/workers/cluster_worker.go`:
- Around line 181-201: The handleProvision function currently ignores errors
from w.repo.Update (several occurrences) which can hide persistent DB failures;
change each "_ = w.repo.Update(ctx, cluster)" to capture the returned error, log
it with context (include "cluster_id" and the error) and decide per call whether
to escalate (return the error) or treat as best-effort: for status transitions
before calling provision you can log and continue, but for updates after a
provisioning failure or after marking running/failed you should log the failure
and return it to avoid leaving the cluster in a stale state; update references:
ClusterWorker.handleProvision and w.repo.Update call sites inside that function
to implement this logging/return behavior.
In `@internal/workers/leader_guard_test.go`:
- Around line 130-143: Remove the unused local variable `inner` of type
`*mockRunner` from the test: it's declared but never referenced and
`countingRunner` is passed into `NewLeaderGuard` instead; delete the `inner :=
&mockRunner{}` line and the trailing comment referencing it so the test only
constructs `countingRunner`, calls `NewLeaderGuard(elector, "test:worker",
countingRunner, newTestLogger())`, and runs `guard.Run(ctx, wg)` without an
unused variable.
In `@internal/workers/leader_guard.go`:
- Around line 69-84: The loop in the leader guard currently retries immediately
when RunAsLeader returns an error, which can spin; modify the loop around the
RunAsLeader error path (the block that checks err, ctx, g.logger, g.key) to
apply a backoff before retrying: implement a context-aware exponential backoff
with jitter (capped maximum) and use select on ctx.Done() vs time.After to abort
early, logging the backoff duration via g.logger; ensure the backoff is only
applied for transient errors (when ctx.Err() == nil) so clean shutdowns are
unaffected.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 666fa7da-1822-43f7-a4d7-ca228b04d44e
📒 Files selected for processing (35)
cmd/api/main.gocmd/api/main_test.gointernal/api/setup/dependencies.gointernal/core/ports/execution_ledger.gointernal/core/ports/leader.gointernal/core/ports/task_queue.gointernal/drills/ha_drills_test.gointernal/drills/release_gates_test.gointernal/platform/bulkhead.gointernal/platform/bulkhead_test.gointernal/platform/circuit_breaker.gointernal/platform/circuit_breaker_test.gointernal/platform/resilient_compute.gointernal/platform/resilient_compute_test.gointernal/platform/resilient_dns.gointernal/platform/resilient_lb.gointernal/platform/resilient_network.gointernal/platform/resilient_storage.gointernal/platform/retry.gointernal/platform/retry_test.gointernal/repositories/noop/adapters.gointernal/repositories/postgres/execution_ledger.gointernal/repositories/postgres/leader_elector.gointernal/repositories/postgres/leader_elector_test.gointernal/repositories/postgres/migrations/100_create_job_executions.down.sqlinternal/repositories/postgres/migrations/100_create_job_executions.up.sqlinternal/repositories/redis/durable_task_queue.gointernal/repositories/redis/durable_task_queue_test.gointernal/workers/cluster_worker.gointernal/workers/cluster_worker_test.gointernal/workers/leader_guard.gointernal/workers/leader_guard_test.gointernal/workers/pipeline_worker.gointernal/workers/provision_worker.gointernal/workers/provision_worker_test.go
| // Nack signals that the consumer failed to process the message. | ||
| // The implementation should make the message available for redelivery | ||
| // (e.g. by not acknowledging it and letting the pending-entry timeout | ||
| // handle redelivery, or by explicitly re-queuing). | ||
| Nack(ctx context.Context, queueName, groupName, messageID string) error |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Tighten Nack so implementations cannot fork a message.
Allowing a backend to “explicitly re-queue” here while the original delivery is still pending creates a second live copy with a different message ID. That makes Ack/reclaim semantics ambiguous and works against the no-duplicate-destructive-actions goal. Nack should only relinquish the current delivery; immediate requeue should be a separate API with explicit ownership transfer.
📝 Suggested contract wording
// Nack signals that the consumer failed to process the message.
- // The implementation should make the message available for redelivery
- // (e.g. by not acknowledging it and letting the pending-entry timeout
- // handle redelivery, or by explicitly re-queuing).
+ // Nack must not create a second live copy of the same job.
+ // For consumer-group backends, it should relinquish the current delivery
+ // and let reclaim/redelivery hand it to another consumer.
Nack(ctx context.Context, queueName, groupName, messageID string) error📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Nack signals that the consumer failed to process the message. | |
| // The implementation should make the message available for redelivery | |
| // (e.g. by not acknowledging it and letting the pending-entry timeout | |
| // handle redelivery, or by explicitly re-queuing). | |
| Nack(ctx context.Context, queueName, groupName, messageID string) error | |
| // Nack signals that the consumer failed to process the message. | |
| // Nack must not create a second live copy of the same job. | |
| // For consumer-group backends, it should relinquish the current delivery | |
| // and let reclaim/redelivery hand it to another consumer. | |
| Nack(ctx context.Context, queueName, groupName, messageID string) error |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/core/ports/task_queue.go` around lines 50 - 54, The Nack contract on
the TaskQueue interface currently allows implementations to "explicitly
re-queue" while the original delivery is still pending, which can create
duplicate live copies; change the Nack documentation and signature contract on
Nack(ctx context.Context, queueName, groupName, messageID string) error to
explicitly forbid re-queuing or creating a new live message ID and restrict it
to only relinquishing the current delivery (e.g., by releasing
visibility/pending entry); add a new explicit method (e.g., Requeue(ctx
context.Context, queueName, groupName, messageID string) error) that transfers
ownership/creates a new delivery and document its semantics separately so
implementations stop forking messages in Nack and instead implement the new
Requeue API for immediate re-enqueue semantics, updating any implementing types
and callers to follow the new contract.
| sem := make(chan struct{}, clusterMaxWorkers) | ||
|
|
||
| sem := make(chan struct{}, maxConcurrentClusts) | ||
| go w.reclaimLoop(ctx, sem) | ||
|
|
There was a problem hiding this comment.
Consider tracking reclaimLoop goroutine for graceful shutdown.
The reclaimLoop goroutine is started but not tracked in the WaitGroup. While the context cancellation will stop the loop, the main Run method returns immediately after ctx.Done() without waiting for reclaimLoop to finish. This could cause in-flight reclaim operations to be interrupted.
🛡️ Proposed fix to track reclaimLoop
func (w *ClusterWorker) Run(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
// ...
sem := make(chan struct{}, clusterMaxWorkers)
- go w.reclaimLoop(ctx, sem)
+ reclaimWg := &sync.WaitGroup{}
+ reclaimWg.Add(1)
+ go func() {
+ defer reclaimWg.Done()
+ w.reclaimLoop(ctx, sem)
+ }()
for {
select {
case <-ctx.Done():
w.logger.Info("stopping cluster worker")
+ reclaimWg.Wait()
return🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/workers/cluster_worker.go` around lines 65 - 68, The reclaimLoop
goroutine is not tracked by the worker's lifecycle; update Run to increment the
worker WaitGroup before starting reclaimLoop (call wg.Add(1)), start reclaimLoop
in a small wrapper that defers wg.Done(), and ensure Run waits for wg.Wait()
after ctx.Done() so in-flight reclaim operations using sem (the channel made
with clusterMaxWorkers) finish before returning; alternatively add wg.Done()
inside the top of reclaimLoop and call wg.Wait() in Run after cancellation.
| assert.Contains(t, buf.String(), tt.wantLog) | ||
| if tt.wantAcked { | ||
| assert.NotEmpty(t, fq.acked, "expected message to be acked") | ||
| } | ||
| if tt.wantNacked { | ||
| assert.NotEmpty(t, fq.nacked, "expected message to be nacked") | ||
| } |
There was a problem hiding this comment.
Assert the opposite ack path stays empty too.
These cases only verify that the expected side was touched. A worker that mistakenly Nacks and then Acks the same durable message would still pass the “success” and poison-message cases. Since the queue transition is the behavior under test, please assert the non-expected slice stays empty as well.
🧪 Suggested assertion upgrade
assert.Contains(t, buf.String(), tt.wantLog)
if tt.wantAcked {
assert.NotEmpty(t, fq.acked, "expected message to be acked")
+ assert.Empty(t, fq.nacked, "did not expect message to be nacked")
}
if tt.wantNacked {
assert.NotEmpty(t, fq.nacked, "expected message to be nacked")
+ assert.Empty(t, fq.acked, "did not expect message to be acked")
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| assert.Contains(t, buf.String(), tt.wantLog) | |
| if tt.wantAcked { | |
| assert.NotEmpty(t, fq.acked, "expected message to be acked") | |
| } | |
| if tt.wantNacked { | |
| assert.NotEmpty(t, fq.nacked, "expected message to be nacked") | |
| } | |
| assert.Contains(t, buf.String(), tt.wantLog) | |
| if tt.wantAcked { | |
| assert.NotEmpty(t, fq.acked, "expected message to be acked") | |
| assert.Empty(t, fq.nacked, "did not expect message to be nacked") | |
| } | |
| if tt.wantNacked { | |
| assert.NotEmpty(t, fq.nacked, "expected message to be nacked") | |
| assert.Empty(t, fq.acked, "did not expect message to be acked") | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/workers/provision_worker_test.go` around lines 166 - 172, The tests
currently only assert the expected side was touched (using
tt.wantAcked/tt.wantNacked and fq.acked/fq.nacked) but don’t assert the opposite
side remained untouched; update the assertions so that when tt.wantAcked is true
you also assert fq.nacked is empty, when tt.wantNacked is true you also assert
fq.acked is empty, and when neither flag is set assert both fq.acked and
fq.nacked are empty; keep the existing buf.String() log assertion and use the
same fq.acked/fq.nacked symbols to locate the checks to modify.
|
Superseded by upstream PR poyrazK#101 |
Summary
Implements comprehensive High Availability for the control plane to survive single-node failures without outages or duplicate destructive actions.
Changes
Phase 1-2: Topology & Leader Election
APP_ROLE→ROLEenv var mismatchPgLeaderElector)LeaderGuardwrapper for 11 singleton workersPhase 3: Durable Job Execution
Phase 4: Failure Isolation
Phase 5: Testing
SLO Targets Met
Test Results
go vetFiles Changed
34 new files, 12 modified files across:
internal/core/ports/- new interfacesinternal/platform/- resilience primitives + wrappersinternal/repositories/- implementationsinternal/workers/- HA-enabled workersinternal/drills/- HA validation testscmd/api/- wiring changesSummary by CodeRabbit
New Features
Reliability Improvements
Tests